package network

import (
	"bytes"
	"context"
	"errors"
	"io"
	"net"
	"strings"
	"sync"
	"sync/atomic"
	"time"

	"gitee.com/simonxie979/skymeta/uuid"
)

const (
	forcibly = "forcibly closed by the remote host"
	timeout  = "i/o timeout"
)

type session[T any] struct {
	ctx    context.Context
	cancel context.CancelFunc
	wg     sync.WaitGroup

	closeFlag uint32           // 关闭标签
	guid      uint64           // Session ID
	conn      net.Conn         // 底层连接
	outpipe   chan T           // 发消息通道
	comm      *Communicator[T] // 通讯对象
}

func newSession[T any](ctx context.Context, conn net.Conn, comm *Communicator[T]) *session[T] {
	s := new(session[T])
	s.ctx, s.cancel = context.WithCancel(ctx)
	s.guid = uuid.GetGuid()
	s.conn = conn
	s.outpipe = make(chan T, 10000)
	s.comm = comm

	return s
}

// GetSessionID Get session id of this connection
func (s *session[T]) GetSessionID() uint64 {
	return s.guid
}

// GetRemoteIP Get IP string of peer end
func (s *session[T]) GetRemoteIP() string {
	if s.conn != nil {
		address := s.conn.RemoteAddr()
		switch addr := address.(type) {
		case *net.TCPAddr:
			return addr.IP.String()
		}
	}

	return "???"
}

// GetRemoteAddr Get IP and port string of peer end
func (s *session[T]) GetRemoteAddr() (result string) {
	return s.conn.RemoteAddr().String()
}

// Start Launch two goroutine to start write loop and read loop
func (s *session[T]) Start() {
	s.wg.Add(2)
	go s.writeLoop()
	go s.readLoop()
}

// Close Close the connection
func (s *session[T]) Close(err error) {
	if !atomic.CompareAndSwapUint32(&s.closeFlag, 0, 1) {
		// already closed
		return
	}

	// Use for quit goroutine of read loop and write loop
	s.cancel()

	// Use for quit socket read/write
	s.conn.SetDeadline(time.Now())

	s.wg.Wait()
	close(s.outpipe)
	s.conn.Close()

	s.comm.host.OnDisconnect(s.guid, err)
}

// Send Send data to peer end of connection
func (s *session[T]) Send(data T) bool {
	if atomic.LoadUint32(&s.closeFlag) == 0 {
		s.outpipe <- data
		return true
	}
	return false
}

// writeLoop Goroutine of write data to connection
func (s *session[T]) writeLoop() {
	defer s.wg.Done()

	var buf bytes.Buffer
	for {
		select {
		case data, ok := <-s.outpipe:
			if !ok {
				go s.Close(errors.New("out pipe closed"))
				return
			}

			s.comm.host.Pack(data, &buf)

			for i := 0; i < len(s.outpipe); i++ {
				data = <-s.outpipe
				s.comm.host.Pack(data, &buf)
			}

			if buf.Len() > 1 {
				if _, err := buf.WriteTo(s.conn); err != nil {
					s.comm.log.Errorf("Session", "%X write data error: %v", s.guid, err)
				}
			}
		case <-s.ctx.Done():
			return
		}
	}
}

// readLoop Goroutine of read data from connection
func (s *session[T]) readLoop() {
	defer s.wg.Done()

	for {
		// Must be check done in advance
		// because of the endless loop is possibility
		select {
		case <-s.ctx.Done():
			return
		default:
		}

		payload, err := s.comm.host.Unpack(s.conn)
		if err != nil {
			// The peer end is shut down
			if err == io.EOF || err == io.ErrUnexpectedEOF {
				go s.Close(errors.New(CloseFrom_Remote))
				return
			}

			// The peer end was forcibly shut down
			if strings.Contains(err.Error(), forcibly) {
				go s.Close(errors.New(CloseFrom_RemoteForcibly))
				return
			}

			// Normal shut down
			if strings.Contains(err.Error(), timeout) {
				return
			}

			// Abnormal shut down
			if netErr, ok := err.(net.Error); !ok || !netErr.Timeout() {
				go s.Close(err)
				return
			}

			s.comm.log.Errorf("Session", "read connection[%X] failure. err: %v", s.guid, err)
			continue
		}

		s.comm.host.OnMessage(s.guid, payload)
	}
}
